home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Pascal Super Library
/
Pascal Super Library (CW International)(1997).bin
/
DELPHI32
/
SYS_TOOL
/
MULTI020
/
MPIPES.PAS
< prev
next >
Wrap
Pascal/Delphi Source File
|
1993-09-08
|
8KB
|
272 lines
unit MPipes;
interface
uses Multi;
{ Multitasking pipes using #Semaphore#s. }
type
ca = array[0..65533] of char;
{ array of bytes for accessing single chars in the pipe buffer }
pPipe = ^tPipe;
tPipe = object
{ A pipe is an object one task only writes to and one task only reads from.
If the writing task wants to write more than fits into the buffer, or the
reading task wants to read more than is available, it is put asleep using
a semaphore until the request can be performed. }
buf : ^ca;
{ This is the pipe buffer }
bsize : word;
{ The size of the buffer }
head,
{ The head pointer in the pipe ring buffer }
tail : word;
{ The tail pointer in the pipe ring buffer }
ba : word;
{ The number of bytes in the pipe buffer }
readsema : Semaphore;
{ Tasks reading more than is in the pipe are #WaitFor#ed using
this semaphore }
writesema : Semaphore;
{ Tasks writing more than is space in the pipe are #WaitFor#ed
using this semaphore }
InTasks, OutTasks : word;
{ Counts the tasks that in- and output to this pipe }
HasRead, HasWritten : boolean;
{ Tracks activities to pipe }
constructor Init(size : word);
{ Initialize a semaphore with 'size' bytes buffer space.
The pipe will work even with buffer size 1, but it will be a little
slower than necessary due to the task management overhead. }
destructor Done;
{ #Kamikaze# all waiting tasks of #readsema# and #writesema#
and free the buffer }
function Put(ch : char) : boolean;
{ Put ch into the pipe, #WaitFor#ing the current task if the buffer
is full. If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
Put returns TRUE, the task has to de-init and terminate. }
function PutBin(const m; count : word) : boolean;
{ Put the first 'count' bytes from 'm' into the pipe, #WaitFor#ing
the current task if the buffer is full.
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
PutBin returns TRUE, the task has to de-init and terminate. }
function PutS(const s : string) : boolean;
{ Put s into the pipe as-is, can be read by GetS, #WaitFor#ing
the current task if the buffer is full.
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
PutS returns TRUE, the task has to de-init and terminate. }
function WriteLn(const s : string) : boolean;
{ Put s into the pipe like WriteLn puts it on the screen,
(i.e. followed by #13#10), #WaitFor#ing the current task
if the buffer is full.
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
PutLn returns TRUE, the task has to de-init and terminate. }
function WaitUntilEmpty : boolean;
{ Wait until the reading task has emptied the pipe buffer.
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
WaitUntilEmpty returns TRUE, the task has to de-init and terminate. }
function Peek(var ch : char) : boolean;
{ If there is at least one character in the pipe, read it into ch
and return TRUE, otherwise return FALSE. }
function Get : char;
{ Read one char from the pipe, #WaitFor#ing the current task if the
buffer is empty.
Note:
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
Get returns garbage, but it returns. So check #tTask.Poisoned# ! }
function GetBin(var m; count : word) : boolean;
{ Read 'count' bytes from the pipe and write them to 'm'.
#WaitFor# the current task if the buffer is empty.
If GetBin returns TRUE, deinitialize and terminate.
Note: GetBin can only return TRUE if #tTask.HasExit# is TRUE. }
function GetS(var s : string) : boolean;
{ Read s from the pipe as-is, written by PutS, #WaitFor#ing
the current task if the buffer is empty.
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
GetS returns TRUE, the task has to de-init and terminate. }
function ReadLn(var s : string) : boolean;
{ Read s from the pipe, waiting for terminating #13#10, #WaitFor#ing
the current task if the buffer is full.
If #tTask.HasExit# is TRUE and #tTask.Poisoned# is TRUE,
PutLn returns TRUE, the task has to de-init and terminate. }
procedure NewInputTask;
{ Tell the pipe that there is a new input task }
procedure NewOutputTask;
{ Tell the pipe that there is a new output task }
procedure NoMoreInput;
{ The inputting task may call this if it shuts down. The outputting
task is then terminated. }
procedure NoMoreOutput;
{ The outputting task may call this if it shuts down. The inputting
task is then terminated. }
end;
implementation
constructor tPipe.Init(size : word);
begin
HasRead := false;
HasWritten := false;
bsize := size; ba := 0; getmem(buf,size);
head := 0; tail := 0;
InitSemaphore(readsema); InitSemaphore(writesema);
intasks := 0; outtasks := 0
end;
destructor tPipe.Done;
begin
Kamikaze(readsema);
Kamikaze(writesema);
repeat if Switch then break until (intasks or outtasks) = 0;
freemem(buf,bsize)
end;
procedure tPipe.NoMoreInput;
begin
dec(intasks);
if (intasks = 0) and (outtasks > 0) then Done
end;
procedure tPipe.NoMoreOutput;
begin
dec(outtasks);
if (outtasks = 0) and (intasks > 0) then Done
end;
procedure tPipe.NewInputTask;
begin
haswritten := true;
inc(intasks)
end;
procedure tPipe.NewOutputTask;
begin
hasread := true;
inc(outtasks)
end;
function tPipe.Put(ch : char) : boolean;
var w : word;
begin
Put := false;
while ba = bsize do
if ((outtasks = 0) and HasRead) or WaitFor(t^,writesema) then begin
t^.Poisoned := true;
Put := true;
exit
end;
w := head;
buf^[head] := ch; inc(head); if head >= bsize then head := 0; inc(ba);
if ba = 1 then Release(readsema) { Buffer war leer }
end;
function tPipe.PutBin(const m; count : word) : boolean;
var
w : word;
a : ca absolute m;
begin
PutBin := false;
w := 0;
while w < count do begin
if Put(a[w]) then begin
PutBin := true;
exit
end;
inc(w)
end
end;
function tPipe.PutS(const s : string) : boolean;
var i : byte;
begin
PutS := true;
for i := 0 to length(s) do
if Put(s[i]) then exit;
PutS := false
end;
function tPipe.WriteLn(const s : string) : boolean;
begin
WriteLn := PutBin(s[1],length(s)) or Put(#13) or Put(#10)
end;
function tPipe.WaitUntilEmpty : boolean;
begin
WaitUntilEmpty := false;
repeat
if Switch then begin
WaitUntilEmpty := true;
exit
end
until head = tail
end;
function tPipe.Peek(var ch : char) : boolean;
begin
if ba = 0 then
Peek := false
else begin
ch := buf^[tail];
Peek := true
end
end;
function tPipe.Get : char;
begin
while ba = 0 do { Buffer leer }
if (haswritten and (intasks = 0)) or WaitFor(t^,readsema) then begin
t^.Poisoned := true;
exit
end;
Get := buf^[tail];
inc(tail); if tail >= bsize then tail := 0;
dec(ba);
if ba = bsize-1 then Release(writesema)
end;
function tPipe.GetBin(var m; count : word) : boolean;
var
w : word;
a : ca absolute m;
begin
GetBin := false;
w := 0;
while w < count do begin
a[w] := Get;
if t^.Poisoned then begin
GetBin := true;
exit
end;
inc(w)
end
end;
function tPipe.GetS(var s : string) : boolean;
var i : byte;
begin
GetS := true;
s[0] := Get; if t^.Poisoned then exit;
for i := 1 to length(s) do begin
s[i] := Get; if t^.Poisoned then exit;
end;
GetS := false
end;
function tPipe.ReadLn(var s : string) : boolean;
begin
ReadLn := true;
s := '';
repeat
s[length(s)+1] := Get;
if t^.Poisoned then exit;
if s[length(s)+1] = #13 then break else inc(s[0]);
until false;
if s[1] = #10 then delete(s,1,1);
ReadLn := false
end;
end.